from airflow.models.dag import DAG
from airflow.utils.dates import days_ago
import  datetime
import pendulum

from dagen.operators import SSHOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.sensors.external_task import ExternalTaskMarker


#意向车型
pt= "{{ execution_date.in_tz('Asia/Shanghai').format('YYYYMMDD') }}"
schedule_interval="0 2 * * *"

local_tz = pendulum.timezone("Asia/Shanghai")
default_args = {
    'owner': 'hgc',  # 拥有者名称
    'depends_on_past': False,   # 是否依赖上一个自己的执行状态
    'email': ['guanghu@tesla.com'],  # 接收通知的email列表
    'email_on_failure': True,  # 是否在任务执行失败时接收邮件
    'email_on_retry': True,  # 是否在任务重试时接收邮件
    'retries': 2  # 失败重试次数
}


dag = DAG(
**{
'dag_id': "cdp_purpose_car_generate",
'catchup': False,
'start_date': datetime.datetime(2021, month=10, day=13, tzinfo=local_tz),
'schedule_interval': schedule_interval,
'default_args':default_args
}
)


#1、意向车型
dwd2dws='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dws_cdp_agg_smp_sm_so_td_overwrite --sql-params "--hivevar pt=%s"'%pt
cdp_purpose_car_generate_2dws = SSHOperator(
    dag=dag,
    **{
        'task_id': "cdp_purpose_car_generate_2dws",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': dwd2dws
      }
)

#2、最近一次购车价格、累计购车金额

lbcPrice_dwd='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dwd_cdp_ble_utopia_vehicleprice_overwrite --sql-params "--hivevar pt=%s"'%pt
lbcPrice_dws='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dws_cdp_agg_ble_utopia_vehicleprice_overwrite --sql-params "--hivevar pt=%s"'%pt

lbcPrice_dwd_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "lbcPrice_dwd_task",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': lbcPrice_dwd
      }
)
lbcPrice_dws_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "lbcPrice_dws_task",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': lbcPrice_dws
      }
)

#3、最近到期车险剩余天数
insurance_dwd='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dwd_cdp_core_insurancedb_insurance_overwrite --sql-params "--hivevar pt=%s"'%pt
insurance_dws='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dws_cdp_agg_core_insurancedb_insurance_insurancedetail_overwrite --sql-params "--hivevar pt=%s"'%pt
insurance_dwd_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "insurance_dwd_task",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': insurance_dwd
      }
)

insurance_dws_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "insurance_dws_task",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': insurance_dws
      }
)

#4、最近一次购车车型、购车方式、购车时间
lpc_dwd='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dwd_cdp_core_orderdb_order_overwrite --sql-params "--hivevar pt=%s"'%pt
lpc_dws='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dws_cdp_agg_core_orderdb_order_purchasedetail_overwrite --sql-params "--hivevar pt=%s"'%pt
lpc_dwd_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "lpc_dwd_task",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': lpc_dwd
      }
)
lpc_dws_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "lpc_dws_task",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': lpc_dws
      }
)

#5、最近一辆车 车龄、累计车龄
lcy_dwd='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dwd_cdp_core_vehiclemapdb_vehiclemap_overwrite --sql-params "--hivevar pt=%s"'%pt
lcy_dws='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dws_cdp_agg_core_vehiclemapdb_vehiclemap_vehicledetail_overwrite --sql-params "--hivevar pt=%s"'%pt
lcy_dwd_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "lcy_dwd_task",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': lcy_dwd
      }
)

lcy_dws_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "lcy_dws_task",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': lcy_dws
      }
)

#6、是否特斯拉车主
isTeslaCarOwner_dwd='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dwd_cdp_core_contactdb_ownership_overwrite --sql-params "--hivevar pt=%s"'%pt
isTeslaCarOwner_dws='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dws_cdp_agg_core_contactdb_ownership_overwrite --sql-params "--hivevar pt=%s"'%pt
isTeslaCarOwner_dwd_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "isTeslaCarOwner_dwd_task",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': isTeslaCarOwner_dwd
      }
)
isTeslaCarOwner_dws_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "isTeslaCarOwner_dws_task",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': isTeslaCarOwner_dws
      }
)

#7、车身类型偏好
om_dws='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dws_cdp_agg_core_orderdb_order_ordermodel_overwrite --sql-params "--hivevar pt=%s"'%pt
om_dws_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "om_dws_task",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': om_dws
      }
)

#8、省市
pro_city_dwd='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dwd_cdp_acm_account_overwrite --sql-params "--hivevar pt=%s"'%pt
pro_city_dws='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name dws_cdp_agg_acm_account_profile_overwrite --sql-params "--hivevar pt=%s"'%pt
pro_city_dwd_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "pro_city_dwd_task",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': pro_city_dwd
      }
)
pro_city_dws_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "pro_city_dws_task",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': pro_city_dws
      }
)
check_one_id_task004 = ExternalTaskSensor(
    task_id='check_one_id_task004',  # waiting for the whole dag to execute
#     execution_delta=None,  # Same day as today
    external_dag_id='cdp_create_oneid',  # here is the id of the dag
    external_task_id='check_one_id_par_task004',  # waiting for the whole dag to execute
    dag=dag,
    timeout=60,
    execution_delta=datetime.timedelta(hours=1),
    allowed_states=['success'],
    failed_states=['failed', 'skipped'],
    check_existence=True
)

par_task4 = ExternalTaskMarker(
    task_id="par_task4",
    external_dag_id="cdp_integration_dag_gen",
    external_task_id="child_task4",
)

# set the dependencies
check_one_id_task004>>cdp_purpose_car_generate_2dws>>lbcPrice_dwd_task>>lbcPrice_dws_task>>insurance_dwd_task>>insurance_dws_task>>lpc_dwd_task>>lpc_dws_task>>lcy_dwd_task>>lcy_dws_task>>isTeslaCarOwner_dwd_task>>isTeslaCarOwner_dws_task>>om_dws_task>>pro_city_dwd_task>>pro_city_dws_task>>par_task4